package com.infra.common.util;

import org.apache.commons.lang3.concurrent.BasicThreadFactory;

import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.*;

/**
 * 线程池工具类
 * 示例：
 * ThreadPoolUtils.execute(() -> System.out.println("DEFAULT_FIXED_THREAD_POOL execute"));
 * String taskResult = ThreadPoolUtils.getTaskResult(ThreadPoolUtils.submit(() -> {
 * System.out.println("CACHED_THREAD_POOL submit");
 * return "TaskResult";
 * }), "失败了");
 *
 * @author PD
 */
@SuppressWarnings("unchecked")
public class ThreadPoolUtils {

    // 线程池map
    private static final Map<String, ThreadPoolExecutor> THREAD_POOL_EXECUTOR_MAP = new ConcurrentHashMap<>();
    /**
     * newFixedThreadPool（指定工作线程数量的线程池）
     * 核心线程数等于最大线程数。也就是该线程池中没有非核心线程。
     */
    private static final ThreadPoolExecutor DEFAULT_FIXED_THREAD_POOL = createFixedPool("DefaultFixedThreadPool", 5);
    /**
     * newCachedThreadPool (可缓存线程池)
     * 核心线程数为10，总线程数是最大整数，当需要执行很多短时任务时它的服用率比较高，会显著提升性能。并且线程空闲60s就会被回收，所以如果没有任务，它并不会占用很多资源。
     */
    private static final ThreadPoolExecutor CACHED_THREAD_POOL = new ThreadPoolExecutor(
            10,
            Integer.MAX_VALUE,
            60L,
            TimeUnit.SECONDS,
            new SynchronousQueue<>(),
            getThreadFactory("CachedThreadPool")
    );

    static {
        CACHED_THREAD_POOL.prestartAllCoreThreads();
    }

    /**
     * 生成线程池工厂的格式化
     *
     * @param poolName
     * @return
     */
    private static String genThreadFactoryNameFormat(String poolName) {
        if (poolName == null) {
            poolName = "ThreadPool";
        }
        poolName = poolName.trim();
        return poolName + "-%03d";
    }

    /**
     * 获取线程工厂
     *
     * @param poolName
     * @return
     */
    public static ThreadFactory getThreadFactory(String poolName) {
        return new BasicThreadFactory.Builder()
                .namingPattern(genThreadFactoryNameFormat(Optional.ofNullable(poolName).orElse("RealTimeTaskPool")))
                .daemon(true)
                .build();
    }

    /**
     * 创建固定线程数的线程池
     *
     * @param poolName
     * @return
     */
    public static ThreadPoolExecutor createFixedPool(String poolName, int poolSize) {
        if (poolName == null) {
            poolName = "DefaultFixedThreadPool";
        }
        if (poolSize <= 0 || poolSize >= 50) {
            poolSize = 5;
        }

        return new ThreadPoolExecutor(
                poolSize,
                poolSize,
                5L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(),
                getThreadFactory(poolName.trim())
        );
    }

    public static void execute(Runnable runnable) {
        execute(null, runnable);
    }

    /**
     * 将任务放入线程池中执行
     *
     * @param poolName 线程池名称
     * @param runnable 任务
     */
    public static void execute(String poolName, Runnable runnable) {
        if (runnable == null) {
            return;
        }
        Optional.ofNullable(getThreadPoolExecutor(poolName))
                .ifPresent(pool -> pool.execute(runnable));
    }

    /**
     * 将任务放入线程池中,等待一段时间后执行
     *
     * @param poolName   线程池名称
     * @param runnable   任务
     * @param delayValue 延迟数值,默认为0
     * @param timeUnit   时间单位,默认为毫秒
     */
    public static void execute(String poolName, Runnable runnable, Long delayValue, TimeUnit timeUnit) {
        execute(poolName, () -> {
            try {
                Optional.ofNullable(timeUnit)
                        .orElse(TimeUnit.MILLISECONDS)
                        .sleep(Optional.ofNullable(delayValue).orElse(0L));
            } catch (InterruptedException e) {
                e.printStackTrace();
                return;
            }
            runnable.run();
        });
    }

    public static <T> FutureTask<T> execute(Callable<T> callable) {
        return execute(null, callable);
    }

    /**
     * 将任务放入线程池中执行,并返回一个可用来查看状态的对象
     *
     * @param poolName 线程池名称
     * @param callable 任务
     * @param <T>
     * @return
     */
    public static <T> FutureTask<T> execute(String poolName, Callable<T> callable) {
        if (callable == null) {
            return new FutureTask<>(() -> null);
        }

        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor(poolName);
        FutureTask task = new FutureTask(callable);
        threadPoolExecutor.execute(task);
        return task;
    }

    /**
     * 启动一个线程执行任务
     *
     * @param runnable 任务
     * @return
     */
    public static Future submit(Runnable runnable) {
        if (runnable == null) {
            throw new NullPointerException("任务不能为空");
        }
        return CACHED_THREAD_POOL.submit(runnable);
    }

    /**
     * 启动一个线程执行任务,并返回一个FutureTask对象
     *
     * @param callable
     * @param <T>
     * @return
     */
    public static <T> FutureTask<T> submit(Callable<T> callable) {
        if (callable == null) {
            return new FutureTask<>(() -> null);
        }
        FutureTask task = new FutureTask(callable);
        CACHED_THREAD_POOL.submit(task);
        return task;
    }


    /**
     * 获取线程池
     *
     * @param poolName 线程池名称
     * @return
     */
    private static ThreadPoolExecutor getThreadPoolExecutor(String poolName) {
        return getThreadPoolExecutor(poolName, 5);
    }

    /**
     * 获取线程池
     *
     * @param poolName     线程池名称
     * @param corePoolSize 核心线程数,该参数仅在首次创建时生效
     * @return
     */
    private static ThreadPoolExecutor getThreadPoolExecutor(String poolName, int corePoolSize) {
        ThreadPoolExecutor threadPoolExecutor;
        if (poolName == null) {
            return DEFAULT_FIXED_THREAD_POOL;
        }
        poolName = poolName.trim();//.toUpperCase();
        threadPoolExecutor = THREAD_POOL_EXECUTOR_MAP.get(poolName);
        if (threadPoolExecutor == null) {
            synchronized (ThreadPoolUtils.class) {
                threadPoolExecutor = THREAD_POOL_EXECUTOR_MAP.get(poolName);
                if (threadPoolExecutor == null) {
                    threadPoolExecutor = createFixedPool(poolName, corePoolSize);
                    THREAD_POOL_EXECUTOR_MAP.put(poolName, threadPoolExecutor);
                }
            }
        }
        return threadPoolExecutor;
    }

    /**
     * 将线程加入到当前线程中
     *
     * @param threads
     */
    public static void join(Thread... threads) {
        if (threads != null) {
            for (Thread thread : threads) {
                try {
                    thread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    break;
                }
            }
        }
    }

    public static <T> T getTaskResult(Future<T> future) throws ExecutionException, InterruptedException {
        if (future == null) {
            return null;
        }

        return future.get();
    }

    /**
     * 获取线程执行结果的结果,些方法将捕获异常
     *
     * @param future
     * @param defaultRet
     * @param <T>
     * @return
     */
    public static <T> T getTaskResult(Future<T> future, T defaultRet) {
        if (future == null) {
            return null;
        }

        try {
            return future.get();
        } catch (Exception e) {
            if (defaultRet != null) {
                return defaultRet;
            }

            if (e.getCause() instanceof InvocationTargetException) {
                return (T) ((InvocationTargetException) e.getCause()).getTargetException().getMessage();
            }
            return (T) e.getMessage();
        }
    }

    /**
     * 线程休眠
     *
     * @param seconds 秒数
     */
    public static void sleep(int seconds) {
        try {
            Thread.sleep(seconds * 1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
